//---------------------------------------------------------- -*- Mode: C++ -*-
// $Id$
//
// Created 2006/03/23
// Author: Sriram Rao
//
// Copyright 2008 Quantcast Corp.
// Copyright 2006-2008 Kosmix Corp.
//
// This file is part of Kosmos File System (KFS).
//
// Licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// 
//----------------------------------------------------------------------------

#include "ClientSM.h"

#include "ChunkManager.h"
#include "ChunkServer.h"
#include "Utils.h"
#include "KfsOps.h"

#include <string>
#include <sstream>
using std::string;
using std::ostringstream;

#include "common/log.h"
#include "libkfsIO/Globals.h"

#include <boost/scoped_array.hpp>
using boost::scoped_array;

using namespace KFS;
using namespace KFS::libkfsio;

const int kMaxCmdHeaderLength = 1 << 10;

ClientSM::ClientSM(NetConnectionPtr &conn)
    : mNetConnection(conn),
      mCurOp(0)
{
    SET_HANDLER(this, &ClientSM::HandleRequest);
    mNetConnection->SetMaxReadAhead(kMaxCmdHeaderLength);
    mNetConnection->SetInactivityTimeout(gClientManager.GetIdleTimeoutSec());
}

ClientSM::~ClientSM()
{
    KfsOp *op;

    assert(mOps.empty());
    while (!mOps.empty()) {
        op = mOps.front();
        mOps.pop_front();
        delete op;
    }
    gClientManager.Remove(this);
}

///
/// Send out the response to the client request.  The response is
/// generated by MetaRequest as per the protocol.
/// @param[in] op The request for which we finished execution.
///
void
ClientSM::SendResponse(KfsOp *op)
{
    IOBuffer::OStream os;
    ReadOp *rop;
    const string s = op->Show();
    const string clientIP = mNetConnection->GetPeerName();

#ifdef DEBUG
    verifyExecutingOnNetProcessor();
#endif    
    op->Response(os);

    struct timeval now;
    float timespent;
    
    gettimeofday(&now, NULL);
    timespent = ComputeTimeDiff(op->startTime, now);

    KFS_LOG_VA_DEBUG("Client %s, Command %s: Response status: %d\n", 
                     clientIP.c_str(), s.c_str(), op->status);

    mNetConnection->Write(&os);

    if (timespent > 0.2) {
        KFS_LOG_STREAM_INFO << "RPC too long: Client: " << clientIP << ": "
                            << op->Show() <<  " : RPC time: " << timespent << KFS_LOG_EOM;
    }

    if (op->op == CMD_WRITE_SYNC) {
        KFS_LOG_STREAM_INFO <<
            "Ack'ing to " << clientIP << ": " << op->Show() <<
            ", status = " << op->status <<
        KFS_LOG_EOM;
    }

    if (op->op == CMD_READ) {
        struct timeval now;
        float timespent;

        gettimeofday(&now, NULL);
        timespent = ComputeTimeDiff(op->startTime, now);

        // need to send out the data read
        rop = static_cast<ReadOp *> (op);
        
        KFS_LOG_VA_INFO("Client: %s, Read done: %s, status = %d (RPC time = %.3f)", clientIP.c_str(), 
                        rop->Show().c_str(), rop->status, timespent);
        if (op->status >= 0) {
            assert(rop->dataBuf->BytesConsumable() == rop->status);
            mNetConnection->Write(rop->dataBuf, rop->numBytesIO);
        }
    } else if (op->op == CMD_GET_CHUNK_METADATA) {
        GetChunkMetadataOp *gcm = static_cast<GetChunkMetadataOp *>(op);
        if (op->status >= 0)
            mNetConnection->Write(gcm->dataBuf, gcm->numBytesIO);            
    }
}

///
/// Generic event handler.  Decode the event that occurred and
/// appropriately extract out the data and deal with the event.
/// @param[in] code: The type of event that occurred
/// @param[in] data: Data being passed in relative to the event that
/// occurred.
/// @retval 0 to indicate successful event handling; -1 otherwise.
///
int
ClientSM::HandleRequest(int code, void *data)
{
    IOBuffer *iobuf;
    KfsOp *op;
    int cmdLen = 0;

#ifdef DEBUG
    verifyExecutingOnNetProcessor();
#endif    

    switch (code) {
    case EVENT_NET_READ:
	// We read something from the network.  Run the RPC that
	// came in.
	iobuf = (IOBuffer *) data;
	while (mCurOp || IsMsgAvail(iobuf, &cmdLen)) {
	    // if we don't have all the data for the command, wait
	    if (!HandleClientCmd(iobuf, cmdLen))
		break;
	}
        int hdrsz;
        if (! mCurOp &&
                (hdrsz = iobuf->BytesConsumable()) > MAX_RPC_HEADER_LEN) {
            KFS_LOG_VA_ERROR("exceeded max request header size: %d > %d,"
                " closing connection\n", (int)hdrsz, (int)MAX_RPC_HEADER_LEN);
            iobuf->Clear();
            HandleRequest(EVENT_NET_ERROR, NULL);
        }
	break;

    case EVENT_NET_WROTE:
	// Something went out on the network.  For now, we don't
	// track it. Later, we may use it for tracking throttling
	// and such.
	break;

    case EVENT_CMD_DONE:
	// An op finished execution.  Send response back in FIFO
        gChunkServer.OpFinished();
            
	op = (KfsOp *) data;
	op->done = true;
        assert(!mOps.empty());
	while (!mOps.empty()) {
	    KfsOp *qop = mOps.front();
	    if (!qop->done)
		break;
            if (mNetConnection)
                SendResponse(qop);
	    mOps.pop_front();
            OpFinished(qop);
	    delete qop;
	}
        if (mNetConnection)
            mNetConnection->StartFlush();
	break;


    case EVENT_INACTIVITY_TIMEOUT:
        {
            std::string ip;
            int nRead  = 0;
            int nWrite = 0;
            if (mNetConnection) {
                ip     = mNetConnection->GetPeerName();
                nRead  = mNetConnection->GetNumBytesToRead();
                nWrite = mNetConnection->GetNumBytesToWrite();
            } else {
                ip = "unknown";
            }
	    KFS_LOG_VA_INFO("Closing connection to peer: %s due to timeout"
                " pending read: %d write: %d", ip.c_str(), nRead, nWrite);
        }
        // Fall through
    case EVENT_NET_ERROR:

	if (mNetConnection) {
            string clientIP = mNetConnection->GetPeerName();
            KFS_LOG_VA_INFO("Closing connection from client %s", clientIP.c_str());
	    mNetConnection->Close();
        }

        // get rid of the connection to all the peers in daisy chain;
        // if there were any outstanding ops, they will all come back
        // to this method as EVENT_CMD_DONE and we clean them up above.
        ReleaseAllServers(mRemoteSyncers);

        // if there are any disk ops, wait for the ops to finish
        SET_HANDLER(this, &ClientSM::HandleTerminate);

        if (HandleTerminate(code, NULL) != 0) {
            // this was deleted, return now.
            return 0;
        }
	break;

    default:
	assert(!"Unknown event");
	break;
    }
    // Enforce 5 min timeout if connection has pending read and write.
    if (mNetConnection) {
        mNetConnection->SetInactivityTimeout(
            (mNetConnection->HasPendingRead() ||
                mNetConnection->IsWriteReady()) ?
            gClientManager.GetIoTimeoutSec() :
            gClientManager.GetIdleTimeoutSec());
    }
    return 0;
}

///
/// Termination handler.  For the client state machine, we could have
/// ops queued at the logger.  So, for cleanup wait for all the
/// outstanding ops to finish and then delete this.  In this state,
/// the only event that gets raised is that an op finished; anything
/// else is bad.
///
int
ClientSM::HandleTerminate(int code, void *data)
{
    KfsOp *op;

#ifdef DEBUG
    verifyExecutingOnNetProcessor();
#endif    

    switch (code) {
    case EVENT_CMD_DONE:
        gChunkServer.OpFinished();
	// An op finished execution.  Send a response back
	op = (KfsOp *) data;
	op->done = true;
	if (op != mOps.front())
	    break;
	while (!mOps.empty()) {
	    op = mOps.front();
	    if (!op->done)
		break;
            OpFinished(op);
	    // we are done with the op
	    mOps.pop_front();
	    delete op;
	}
	break;

    case EVENT_INACTIVITY_TIMEOUT:
    case EVENT_NET_ERROR:
        // clean things up
        break;

    default:
	assert(!"Unknown event");
	break;
    }

    if (mOps.empty()) {
        // all ops are done...so, now, we can nuke ourself.
        assert(mPendingOps.empty());
        if (mNetConnection) {
            mNetConnection->SetOwningKfsCallbackObj(0);
        }
        delete this;
        return 1;
    }
    return 0;
}

///
/// We have a command in a buffer.  It is possible that we don't have
/// everything we need to execute it (for example, for a write we may
/// not have received all the data the client promised).  So, parse
/// out the command and if we have everything execute it.
/// 

bool
ClientSM::HandleClientCmd(IOBuffer *iobuf,
                          int cmdLen)
{
    KfsOp *op = mCurOp;
    size_t nAvail;

    assert(op ? cmdLen == 0 : cmdLen > 0);
    if (! op) {
        IOBuffer::IStream is(*iobuf, cmdLen);
        if (ParseCommand(is, &op) != 0) {
            assert(! op);
            is.Rewind(cmdLen);
            char buf[128];
            while (is.getline(buf, sizeof(buf))) {
                KFS_LOG_VA_DEBUG("Aye?: %s", buf);
            }
            iobuf->Consume(cmdLen);
            // got a bogus command
            return true;
        }
    }

    if (op->op == CMD_WRITE_PREPARE) {
        WritePrepareOp *wop = static_cast<WritePrepareOp *> (op);
        assert(wop != NULL);
        // if we don't have all the data for the write, hold on...
        nAvail = iobuf->BytesConsumable() - cmdLen;
        if (nAvail < wop->numBytes) {
            if (! mCurOp) {
                if (wop->numBytes > gChunkManager.GetMaxIORequestSize()) {
                    KFS_LOG_VA_ERROR("bad write request size: %d, closing connection\n",
                        (int)wop->numBytes);
                    iobuf->Clear();
                    delete op;
                    HandleRequest(EVENT_NET_ERROR, NULL);
                    return true;
                }
                // Move write data to the start of the buffers, to make it
                // aligned. Normally only one buffer will be created.
                iobuf->Consume(cmdLen);
                const int off(wop->offset % IOBufferData::GetDefaultBufferSize());
                if (off > 0) {
                    IOBuffer buf;
                    buf.ReplaceKeepBuffersFull(iobuf, off, nAvail);
                    iobuf->Move(&buf);
                    iobuf->Consume(off);
                } else {
                    iobuf->MakeBuffersFull();
                }
                mCurOp = op;
            }
            mNetConnection->SetMaxReadAhead(wop->numBytes - nAvail);
            // we couldn't process the command...so, wait
            return false;
        }
        wop->dataBuf = new IOBuffer();
        if (nAvail != wop->numBytes || cmdLen > 0) {
            assert(nAvail >= wop->numBytes);
            iobuf->Consume(cmdLen);
            const int off(wop->offset % IOBufferData::GetDefaultBufferSize());
            if (nAvail == wop->numBytes && off <= 0) {
                iobuf->MakeBuffersFull();
                wop->dataBuf->Move(iobuf);
            } else {
                wop->dataBuf->ReplaceKeepBuffersFull(iobuf, off, wop->numBytes);
                wop->dataBuf->Consume(off);
            }
        } else {
            wop->dataBuf->Move(iobuf);
        }
        mCurOp = 0;
        mNetConnection->SetMaxReadAhead(kMaxCmdHeaderLength);
        // KFS_LOG_VA_DEBUG("Got command: %s", buf.get());
        // KFS_LOG_VA_DEBUG("# of bytes avail for write: %lu", nAvail);
    } else {
        string s = op->Show();

        KFS_LOG_VA_DEBUG("Got command: %s\n", s.c_str());

        iobuf->Consume(cmdLen);
    }

    if (op->op == CMD_WRITE_SYNC) {
        string clientIP = mNetConnection->GetPeerName();
        KFS_LOG_VA_INFO("Received write sync from %s: %s", clientIP.c_str(), op->Show().c_str());
        // make the write sync depend on a previous write
        KfsOp *w = NULL;
        for (deque<KfsOp *>::iterator i = mOps.begin(); i != mOps.end(); i++) {
            if (((*i)->op == CMD_WRITE_PREPARE) || ((*i)->op == CMD_WRITE_PREPARE_FWD) ||
                ((*i)->op == CMD_WRITE)) {                
                w = *i;
            }
        }
        if (w != NULL) {
            OpPair p;

            op->clnt = this;
            p.op = w;
            p.dependentOp = op;
            mPendingOps.push_back(p);

            KFS_LOG_STREAM_DEBUG <<
                "Keeping write-sync (" << op->seq <<
                ") pending and depends on " << op->seq <<
            KFS_LOG_EOM;
            return true;
        } else {
            KFS_LOG_VA_DEBUG("Write-sync is being pushed down; no writes left... (%d ops left in q)",
                             (int)mOps.size());
        }
    }

    mOps.push_back(op);

    op->clnt = this;
    // op->Execute();
    gChunkServer.OpInserted();

    SubmitOp(op);

    return true;
}

void
ClientSM::OpFinished(KfsOp *doneOp)
{

    // multiple ops could be waiting for a single op to finish...
    while (1) {
        if (mPendingOps.empty())
            return;

        OpPair p;
        p = mPendingOps.front();

        if (p.op->seq != doneOp->seq) {
            break;
        }

        KFS_LOG_STREAM_DEBUG <<
            "Submitting write-sync (" << p.dependentOp->seq <<
            ") since " << p.op->seq << " finished" <<
        KFS_LOG_EOM;
        mOps.push_back(p.dependentOp);
        gChunkServer.OpInserted();
        mPendingOps.pop_front();
        SubmitOp(p.dependentOp);
    }
}

RemoteSyncSMPtr
ClientSM::FindServer(const ServerLocation &loc, bool connect)
{
    return KFS::FindServer(mRemoteSyncers, loc, connect);
}
